package com.cloud.demo.smpp.socket;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import com.cloud.demo.smpp.msg.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloud.demo.bean.SendMsgMo;
import com.cloud.demo.bean.SendMsgReport;
import com.cloud.demo.bean.SendMsgReq;
import com.cloud.demo.redis.RedisDao;
import com.cloud.demo.utils.ByteConvert;
import com.cloud.demo.utils.Constants;
import com.cloud.demo.utils.DateUtils;
import com.cloud.demo.utils.GenerateId;
import com.cloud.demo.utils.JacksonUtil;
import com.cloud.demo.utils.MsgUtils;
import com.cloud.demo.utils.SpringConfigTool;


/**
 * ClassName:SmgpContainerBj <br/>
 * Function: 北京电信<br/>
 * Reason:	 TODO ADD REASON. <br/>
 * Date:     2016年2月25日 下午4:16:41 <br/>
 * @author   LiHao
 * @version  
 * @since    JDK 1.6
 * @see 	 
 */
public class SmppContainer {
	private static Logger logger = LoggerFactory.getLogger(SmppContainer.class);
	private static SmppContainer container=null;
	private volatile static Socket msgSocket;
	private static int timeout = 1000 * 60; // 等待响应超时时间，初始值20秒
	private static boolean readFlag=true; 
	private static String host;
	private static int port;
	private static String clientId;
	private static String secret; 
	private static InputStream inStream = null;
	private static OutputStream outStream = null;
	private static java.util.Timer heartTimer = null;
	private static boolean isLaunchHeartcheck = false;// 是否已启动心跳检测
	private static boolean isNetworkConnect = false; // 网络是否已连接
	private final static ConcurrentHashMap<Integer, Object> recMsgMap = new ConcurrentHashMap<Integer, Object>();
	private static Thread receiveThread = null;
	private final static ReentrantLock lock = new ReentrantLock();
	private static boolean isReConnection=false; //重连标识
	private static String spCode; //下发号码
	private static String spId;
	private static boolean closeFlag=false; //升级时,调用接口主动关闭连接
	//请求发送对列
	public static ConcurrentLinkedQueue<SendMsgReq> smsReqQueue = new ConcurrentLinkedQueue<SendMsgReq>();
	//请求发送对列信号
	public static Semaphore smsReqQueueSemp  = new Semaphore(0);
	public static Semaphore smsQpsSemp  = null;
	private final static AtomicInteger accessCount = new AtomicInteger(0); 
//	private static PushDataService pushDataService =SpringConfigTool.getBean("pushDataService");
	private static RedisDao redisDao=SpringConfigTool.getBean("redisDao");
//	private static RedisDao redisDao=null;
	private static String redis_suffix="smpp";   //redis后缀
	private static String SMS_REDIS_SEQ_SMPP="",SMS_REDIS_MSGID_SMPP="", SMS_REDIS_INCR_SMPP="", SMS_REDIS_REPORT_SMPP="";
	private static int SMS_CARRIER_TYPE_SMPP=0;  //运营商类型
	synchronized public static SmppContainer getInstance(){
		if(container==null){
			container=new SmppContainer();
		}
		return container;
	}
	private SmppContainer(){
		try {
			redis_suffix="smpp";
			int qps=100;
			smsQpsSemp  = new Semaphore(qps);
//			SMS_CARRIER_TYPE_SMPP=Integer.parseInt(ReadConfigation.getConfigItem("cmpp2_carrier_type"));
			host="47.88.131.110";//121.43.19.174 192.168.21.57 114.215.210.226 114.215.237.41  47.88.138.17
			port=8060; //8010  8060 9010
			host="192.168.177.46";//192.168.21.57  127.0.0.1
			//host="192.168.177.46";
			port=5000;//8008  5000
			//clientId="linkin"; //账号 //linkin  ytx
			//secret="g1nad31n";
			clientId="yanzys"; //账号 //linkin  yanzys    TEXT1
			secret="yanzy123"; //密码 //g1nad31n yanzy123 TEXT123
			//spCode="305510"; // 下发号码 //304015  305510  308647
			clientId="ytx";
			secret="ytx";
			//clientId="201630"; //账号 //linkin  yanzys    TEXT1
			//secret="111111"; //密码 //g1nad31n yanzy123 TEXT123
			//spCode="313475"; // 下发号码 //304015  305510  308647
			//spCode="2147483647";
			spId="";  //sp src
//			SMS_REDIS_SEQ_SMGPBJ   SMS_REDIS_MSGID_SMGPBJ SMS_REDIS_INCR_SMGPBJ SMS_REDIS_REPORT_SMGPBJ
//			SMS_CARRIER_TYPE_SMPP=Integer.parseInt(ReadConfigation.getConfigItem("smgp_carrier_type"));
			logger.info("【SMGP{}】host:{},port:{},clientId:{},secret:{},spCode:{},spId:{}",new Object[]{redis_suffix,host,port,clientId,secret,spCode,spId});
			SMS_REDIS_SEQ_SMPP=Constants.SMS_REDIS_SEQ_SMPP+redis_suffix;
			SMS_REDIS_MSGID_SMPP=Constants.SMS_REDIS_MSGID_SMPP+redis_suffix;
			SMS_REDIS_INCR_SMPP=Constants.SMS_REDIS_INCR_SMPP+redis_suffix;
			SMS_REDIS_REPORT_SMPP=Constants.SMS_REDIS_REPORT_SMPP+redis_suffix;
			init(host,port);
		} catch (IOException e) {
			logger.error("【smpp Socket链接短信网关失败】："+e.getMessage());
			e.printStackTrace();
		}
		releaseWork();
		new Thread(new ExeSendQueue()).start();
	}
	private boolean init(String host, int port) throws IOException{
		logger.info("【SMPP 创建Socket】host:{},port:{}",new Object[]{host,port});
		boolean flag=false;
		inStream=null;
		outStream=null;
		msgSocket=new Socket(host,port);
		logger.info("【smpp socket已建立】");
		msgSocket.setKeepAlive(true);
		msgSocket.setTcpNoDelay(true);// 数据不作缓冲，立即发送
		msgSocket.setSoLinger(true, 0);// socket关闭时，立即释放资源
		msgSocket.setTrafficClass(0x04 | 0x10);// 高可靠性和最小延迟传输
		
		inStream=msgSocket.getInputStream();
		outStream=msgSocket.getOutputStream();
		
			logger.info("【smpp 新建消息接收线程】");
			receiveThread = new Thread(new ReceiveWorker(),"smgpBjReceiveThread"); //启动消息接收线程
			receiveThread.start();
			flag=bind(clientId,secret);
			if(flag){
				logger.info("【smpp 链接成功】********************");
				isNetworkConnect=true; 
				closeFlag=false;
			}else{
				logger.warn("【smpp 链接失败】_____________________");
			}
			if(!isLaunchHeartcheck){
				logger.info("【smpp 启动链路检测任务】");
				launchHeartcheck();
			}
			isReConnection=false;
			return flag;
	}
	/**
	 * launchHeartcheck:链路检测 <br/>
	 * TODO(这里描述这个方法适用条件 – 可选).<br/>
	 * TODO(这里描述这个方法的执行流程 – 可选).<br/>
	 * TODO(这里描述这个方法的使用方法 – 可选).<br/>
	 * TODO(这里描述这个方法的注意事项 – 可选).<br/>
	 *
	 * @author LiHao
	 * @since JDK 1.6
	 */
	private void launchHeartcheck() {
		if(null==msgSocket||msgSocket.isClosed()||!msgSocket.isConnected())
			throw new IllegalStateException("socket is not established!");
		heartTimer = new Timer();
		isLaunchHeartcheck = true;
		heartTimer.schedule(new TimerTask() {
			public void run() {
				int count=0;
				boolean result=activityTest();
				while(!result){
					count++;
					logger.info("【smpp 链路检测失败】count:"+count);
					if(count>=Constants.TEST_CONN_COUNT){//如果再次链路检查次数超过两次则终止连接
						if(isReConnection){
							logger.info("【smpp 正在重连,退出检测】"); 
						}else{
							logger.warn("【smpp 链路检测失败次数超过,发起重连】");
							reConnection();
							
						}
						break;
					}
					result=activityTest();
				}
			}
		}, 1 * 15 * 1000, 1 * 30 * 1000);//1 * 15 * 1000
		//延迟60秒后执行 2分钟检测一次
	}
	private synchronized boolean reConnection(){
		if(!isNetworkConnect){
			isReConnection=true;
			logger.info("【smpp重连】重新建立与"+host+":"+port+"的连接");
			if(heartTimer!=null)
				heartTimer.cancel();
			isLaunchHeartcheck=false;
			isNetworkConnect=false;
			readFlag=false;
			if(receiveThread!=null&&!receiveThread.isInterrupted()){
				logger.info("【smpp重连】中断receiveThread线程");
				receiveThread.interrupt();
			}
				try {
					if(msgSocket!=null)
					msgSocket.close();
				} catch (IOException e1) {
					logger.error("【smpp 重连】关闭socket连接发生IO流异常"+e1);
					e1.printStackTrace();
				}
			 try {
				return init(host,port);
			} catch (IOException e) {
				logger.error("smpp 重连初始化异常"+e);
				e.printStackTrace();
				
			}
		}
		return true;
	}
	public boolean activityTest(){
		try {
			int sequenceId=getSequence();
			logger.info("【smpp 链路检查】"+sequenceId);
			SmppMsgHead head=new SmppMsgHead();
			head.setCommandId(SmppCommand.ENQUIRE_LINK);//标识创建连接
			head.setSequenceId(sequenceId);//序列，由我们指定
			
			List<byte[]> dataList=new ArrayList<byte[]>();
			dataList.add(head.toByteArry());
			Map<String, Object> map=sendReqMsg(dataList,sequenceId,true);
			if(map!=null){
			Integer requestId =(Integer)map.get("requestId");
			if(requestId==SmppCommand.ENQUIRE_LINK_RESP){
				SmppMsgHead resp =(SmppMsgHead)map.get("resp");
				logger.info("【smpp 链路检查成功】"+resp.getSequenceId());	
				return true;
			}
			}
			return false;
		} catch (Exception e) {
			logger.error("【smpp 链路检查异常】"+e.getMessage());	
			e.printStackTrace();
			return false;
		}
	}
	private boolean bind(String clientId,String secret){
		logger.info("【smpp 绑定SMGW】,clientId:{},secret:{}",new Object[]{clientId,secret});
		boolean flag=false;
		SmppMsgBind bind=new SmppMsgBind();
		bind.setSystemType("");
		int sequenceId=getSequence();
//		bind.setTotalLength(16+16+9+13+1+1+1+41);//消息总长度，级总字节数:4+4+4(消息头)+8+16+1+4+1(消息主体)
		bind.setCommandId(SmppCommand.BIND_TRANSCEIVER);//标识创建连接
		bind.setSequenceId(sequenceId);//序列，由我们指定
		bind.setVersion((byte)0x34);
		bind.setSystemId(clientId);
		bind.setPassword(secret);
		List<byte[]> dataList=new ArrayList<byte[]>();
		dataList.add(bind.toByteArry());
			logger.debug("【smpp bind ,sequenceId】:"+sequenceId);
			Map<String, Object> map=sendReqMsg(dataList,sequenceId,true);
			if(map!=null){
			Integer requestId =(Integer)map.get("requestId");
			if(requestId==SmppCommand.BIND_TRANSCEIVER_RESP){
				SmppMsgBindResp resp =(SmppMsgBindResp)map.get("resp");
				if(resp.getCommandStatus()==0){
					flag=true;
					logger.info("【smpp 登录成功】");
				}else{
					logger.warn("【smpp 登录失败】Status:{}",new Object[]{resp.getCommandStatus()});
				}
			}
			}
		return flag;
	}
	/**
	 * releaseWork:控制每秒并发. <br/>
	 * TODO(这里描述这个方法适用条件 – 可选).<br/>
	 * TODO(这里描述这个方法的执行流程 – 可选).<br/>
	 * TODO(这里描述这个方法的使用方法 – 可选).<br/>
	 * TODO(这里描述这个方法的注意事项 – 可选).<br/>
	 *
	 * @author LiHao
	 * @since JDK 1.6
	 */
	private void releaseWork() { // 每秒release一次  
		logger.info("【smpp】启动单秒release");
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {  
            @Override  
            public void run() {  
            	int count=accessCount.get();
//            	logger.debug("【smgp releaseWork】count:"+count);
            	accessCount.set(0);  
            	smsQpsSemp.release(count); //信号量加上用了多少
            }  
        }, 1000, 1000, TimeUnit.MILLISECONDS);  
    }
	/**
	 * putSendMsg:放到发送请求对列，返回序号id. <br/>
	 * TODO(这里描述这个方法适用条件 – 可选).<br/>
	 * TODO(这里描述这个方法的执行流程 – 可选).<br/>
	 * TODO(这里描述这个方法的使用方法 – 可选).<br/>
	 * TODO(这里描述这个方法的注意事项 – 可选).<br/>
	 *
	 * @author LiHao
	 * @param req
	 * @return
	 * @since JDK 1.6
	 */
	public Map<String, String> putSendMsg(SendMsgReq req){
		Map<String, String> map=new HashMap<String,String>();
		if(isNetworkConnect){
			String respId=GenerateId.getUUID();
			req.setRespId(respId);
			logger.info("【smpp入对列】"+JacksonUtil.writeValueAsString(req));
			String[] cusMsisdn=req.getTel().split(","); 
			int len=cusMsisdn.length;
			if(len>1){
				logger.debug("【smpp群发入队列】,len:"+len);
				for (int i = 0; i < len; i++) {
					SendMsgReq reqClone= req.clone();
					reqClone.setTel(cusMsisdn[i]);
					smsReqQueue.offer(reqClone);
					smsReqQueueSemp.release(); //通知取队列发送
				}
			}else{
				smsReqQueue.offer(req);
				smsReqQueueSemp.release(); //通知取队列发送
			}
			map.put("respId", respId);
			map.put("result", "0");
		}else{
			map.put("respId", null);
			map.put("result", "-5");
			reConnection();
		}
		return map;
	}
	
	/**
	 * takeSendMsg:从发送对列取数据发送. <br/>
	 * TODO(这里描述这个方法适用条件 – 可选).<br/>
	 * TODO(这里描述这个方法的执行流程 – 可选).<br/>
	 * TODO(这里描述这个方法的使用方法 – 可选).<br/>
	 * TODO(这里描述这个方法的注意事项 – 可选).<br/>
	 *
	 * @author LiHao
	 * @since JDK 1.6
	 */
	public void takeSendMsg(){
		//判断队列信号量
		try {
				smsReqQueueSemp.acquire();
			} catch (InterruptedException e) {
				logger.error("【smgp】取发送队列数据时，信号量获取异常"+e.toString());
				e.printStackTrace();
			}
		if(!isNetworkConnect){
			reConnection();
		}
		
		if(!smsReqQueue.isEmpty()){
			smsQpsSemp.acquireUninterruptibly(1);
			accessCount.incrementAndGet(); 
			SendMsgReq req=smsReqQueue.poll();
			logger.info("【smgp Send msg】send,tel:"+req.getTel()+",respId:"+req.getRespId());
			sendMsg(req);
		}
	}
	private static void pushInfo(String info) {
//		String smgpNoticeUrl=ReadConfigation.getConfigItem("smgpNoticeUrl_bj");
//		pushDataService.postDataByAsynMode(smgpNoticeUrl, info,"smgp_bj");	
	}
	public void sendMsg(SendMsgReq req){
		String msg=req.getMsg();
		//msg="هو رمز تأكيد فيس بوك أو اذهب إلى ‏‎http://fb.com/‎‏【脸谱】";
		String[] cusMsisdn=req.getTel().split(","); 
		String appendCode=req.getAppendCode();
//		String sign=req.getSignature();
		logger.info("【smpp sendMsg】appendCode：{},cusMsisdn:{},msg:{}",new Object[]{appendCode,req.getTel(),msg});
		try{
			int sequenceId=getSequence();
			//redis保存seqId 和 返回短信网关id的关系
			Map<String,String> map=new HashMap<String,String>();
			map.put("respId", req.getRespId());
			map.put("outId", req.getOutId());
			map.put("seqId", sequenceId+"");
			logger.info("【smpp redis保存seqid_respid】,key:{},respId:{},outId:{},seqId:{}",new Object[]{SMS_REDIS_SEQ_SMPP+sequenceId,req.getRespId(),req.getOutId(),sequenceId});

			redisDao.setMapExpire(SMS_REDIS_SEQ_SMPP+sequenceId, map, Constants.SMS_EXPIRE_SECONDS);
			byte msgByte[]=msg.getBytes("UnicodeBigUnmarked");
					//encodeString(msg)
					//msg.getBytes("UnicodeBigUnmarked");
			if(msgByte.length<=140){//短短信
				sendShortMsg(msgByte,cusMsisdn, appendCode,sequenceId);
			}else{//长短信
				sendLongMsg(msgByte,cusMsisdn,appendCode,sequenceId);
			}

		}catch(Exception e){
			logger.error("【smpp sendMsg】error"+e.getMessage());
			e.printStackTrace();
		}
	}
	/**
	 * 发送短短信
	 * @return
	 */
	private void sendShortMsg(byte msgByte[],String[] cusMsisdn,String appendCode,int sequenceId){
		try {		
			SmppMsgSubmit submit=new SmppMsgSubmit();
			submit.setCommandId(SmppCommand.SUBMIT_SM);
			submit.setSequenceId(sequenceId);
			submit.setRegisteredDelivery((byte)1);
			submit.setSourceAddr(appendCode);//扩展码
			submit.setDestinationAddr(cusMsisdn[0]);
			submit.setDataCoding((byte)0x08);
			submit.setSmLength((byte)(msgByte.length));
			submit.setShortMessage(msgByte);
			List<byte[]> dataList=new ArrayList<byte[]>();
			dataList.add(submit.toByteArry());

			logger.info("【smpp向{}下发短短信】序列号为:{}",new Object[]{cusMsisdn[0],sequenceId});
			sendReqMsg(dataList,sequenceId,false);

		} catch (Exception e) {
			logger.error("【smgp发送短短信异常】"+e.getMessage());
			e.printStackTrace();
		}
	}
	
	/**
	 * 发送长短信
	 * @return
	 */
	private void sendLongMsg(byte[] allByte,String[] cusMsisdn,String appendCode,int sequenceId){
		logger.debug("【smpp发送长短信】"+cusMsisdn[0]);
		try {

			List<byte[]> dataList=new ArrayList<byte[]>();
			int msgLength=allByte.length;
			int maxLength=140;
			int msgSendCount=msgLength%(maxLength-6)==0?msgLength/(maxLength-6):msgLength/(maxLength-6)+1;
			//短信息内容头拼接
			byte[] msgHead=new byte[6];
			msgHead[0]=0x05;
			msgHead[1]=0x00;
			msgHead[2]=0x03;
			msgHead[3]=(byte)sequenceId;
			msgHead[4]=(byte)msgSendCount;
			msgHead[5]=0x01;
			logger.debug("【smpp发送长短信】msgSendCount:"+msgSendCount);
			for(int i=0;i<msgSendCount;i++){
				//msgHead[3]=(byte)MsgUtils.getSequence();
				msgHead[5]=(byte)(i+1);
				byte[] needMsg=null;
				//消息头+消息内容拆分
				if(i!=msgSendCount-1){
					int start=(maxLength-6)*i;
					int end=(maxLength-6)*(i+1);
					needMsg=MsgUtils.getMsgBytes(allByte,start,end);
				}else{
					int start=(maxLength-6)*i;
					int end=allByte.length;
					needMsg=MsgUtils.getMsgBytes(allByte,start,end);
				}
				int subLength=needMsg.length+msgHead.length;
				logger.debug("【smpp发送长短信】subLength:"+subLength);
				byte[] sendMsg=new byte[needMsg.length+msgHead.length];
				System.arraycopy(msgHead,0,sendMsg,0,6);
				System.arraycopy(needMsg,0,sendMsg,6,needMsg.length);
				SmppMsgSubmit submit=new SmppMsgSubmit();
				submit.setCommandId(SmppCommand.SUBMIT_SM);
				submit.setSequenceId(sequenceId+i);
				submit.setEsmClass((byte)0x40);
				submit.setRegisteredDelivery((byte)1);
				submit.setSourceAddr(appendCode);//扩展码
				submit.setDestinationAddr(cusMsisdn[0]);
				submit.setDataCoding((byte)0x08);
				submit.setSmLength((byte)subLength);
				submit.setShortMessage(sendMsg);
				dataList.add(submit.toByteArry());
			}
			logger.info("【smpp向{}下发长短信】序列号为:{}",new Object[]{cusMsisdn[0],sequenceId});
			sendReqMsg(dataList,sequenceId,false);

		} catch (Exception e) {
			logger.info("【smgp向{}下发长短信,异常】序列号为:{},error:{}",new Object[]{cusMsisdn[0],sequenceId,e.getMessage()});
			e.printStackTrace();
		}
	}
	@SuppressWarnings("unchecked")
	public Map<String, Object> sendReqMsg(List<byte[]> dataList,int sequenceId,boolean syncFlag) {
		if(dataList ==null) {   
			return null;
		}
		try{
		if(syncFlag){
			Condition msglock = lock.newCondition(); //消息锁  
	        recMsgMap.put(sequenceId, msglock);
			for(byte[] data:dataList){
				outStream.write(data);
				outStream.flush();
		    }
	        try {  
	            lock.lock();
	            logger.debug("【smgp】lock.lock()"+sequenceId);
	            msglock.await(timeout,TimeUnit.MILLISECONDS);  //等待
	            logger.debug("【smgp】msglock.await"+sequenceId);
	        } catch (InterruptedException e) {  
	        	logger.error("发送线程中断",e); 
	        	e.printStackTrace();
	        } finally {  
	            lock.unlock();  
	        }  

	        Object respMsg = recMsgMap.remove(sequenceId); //响应信息  
	        if(respMsg!=null){
		        if((respMsg != msglock)) { 
		        	logger.debug("【smgp】return respMsg");
		        	return (Map<String, Object>) respMsg;
		        } else {  
		        	logger.warn("【smgp】msglock响应超时"+sequenceId+"未收到消息");  
//		            throw new SocketTimeoutException(sequenceId+" smgp超时，未收到响应消息");  
		        	return null;
		        }
	        }else{
	        	logger.warn("【smgp】respMsgnull响应超时"+sequenceId+"未收到消息"); 
//	        	throw new SocketTimeoutException(sequenceId+" smgp respMsg null响应超时，未收到响应消息"); 
	        	return null;
	        }
		}else{
			for(byte[] data:dataList){
				outStream.write(data);
				outStream.flush();
		    }
			return null;
		}
		}catch(IOException e){
			e.printStackTrace();
			logger.error("【smgp】发送短消息时异常,需重连,isNetworkConnect:"+isNetworkConnect+",ex:"+e.getMessage());
			isNetworkConnect=false;
			logger.error("【smgp sendReqMsg 置isNetworkConnect为】"+isNetworkConnect);
			reConnection();
			return null;
		}
	}

	private synchronized static int getSequence(){
		Long seqId=redisDao.incrRes(SMS_REDIS_INCR_SMPP, Constants.SEQ_MAX, Constants.SEQ_MIN+"");
		return seqId.intValue();
	}
	class ExeSendQueue implements Runnable{
		public void run() {
			logger.info("【smpp】启动从队列获取消息发送线程");
			while(true){
				takeSendMsg();
			}
		}
	}
	private class ReceiveWorker implements Runnable {
		
		public void run() {
			readFlag=true; 
			logger.info("【smpp 启动消息接收线程】,readFlag:"+readFlag);
			while (readFlag) {
				try {
					byte [] headBytes = new byte[SmppCommand.MsgHeaderLength];
					byte totalLenBytes[]=new byte[SmppCommand.MsgTotalLength];
					byte requestIdBytes[]=new byte[SmppCommand.MsgCommandId];
					byte commandStatusBytes[]=new byte[SmppCommand.MsgCommandStatus];
					byte sequenceIdBytes[]=new byte[SmppCommand.MsgSequenceId];
					
					int headLen=inStream.read(headBytes);
					if (headLen == -1) {
//						logger.warn("【smgp 读到流未尾，对方已关闭流!,发起重连】");
						continue;
					}else{
						System.arraycopy(headBytes,0,totalLenBytes,0,4);
						System.arraycopy(headBytes,4,requestIdBytes,0,4);
						System.arraycopy(headBytes,8,commandStatusBytes,0,4);
						System.arraycopy(headBytes,12,sequenceIdBytes,0,4);
					}
					int totalLen=ByteConvert.bytesToInt(totalLenBytes);
					if(totalLen<SmppCommand.MsgHeaderLength){
						logger.debug("【smpp读数据时totalLen异常】,totalLen:{}",new Object[]{totalLen});
						continue;
					}
					byte bodyBytes[]=new byte[totalLen-SmppCommand.MsgHeaderLength];
					int bodyLen=inStream.read(bodyBytes);
					int requestId=ByteConvert.bytesToInt(requestIdBytes);
					int commandStatus=ByteConvert.bytesToInt(commandStatusBytes);
					int sequenceId=ByteConvert.bytesToInt(sequenceIdBytes);
					
					logger.debug("【smpp读到数据】,totalLen:{},bodyLen:{},requestId:{},commandStatus:{},sequenceId:{}",new Object[]{totalLen,bodyLen,requestId,commandStatus,sequenceId});
					Map<String, Object> map = new HashMap<String, Object>();
	                map.put("requestId", requestId);
					switch(requestId){
						case SmppCommand.BIND_TRANSCEIVER_RESP:
							SmppMsgBindResp connectResp=new SmppMsgBindResp(totalLen, requestId, commandStatus,sequenceId, bodyBytes);
							map.put("resp", connectResp);
							logger.info("【smpp链接短信网关回值】sequenceId:{},状态:{},序列号:{}",new Object[]{connectResp.getSequenceId(),connectResp.getCommandStatus(),connectResp.getSystemId()});
							break;
						case SmppCommand.SUBMIT_SM_RESP:
							SmppMsgSubmitResp submitResp=new SmppMsgSubmitResp(totalLen, requestId, commandStatus,sequenceId,bodyBytes);
							//map.put("resp", submitResp);
							logger.info("【smpp向用户下发短信回值】状态码:{},序列号:{},msgId:{}",new Object[]{commandStatus,submitResp.getSequenceId(),submitResp.getMessageId()});
							//根据序号取返回给短信网关的消息id

							Map<String,String> respMap=redisDao.getMap(SMS_REDIS_SEQ_SMPP+sequenceId);
							//存redis 有效期24小时 msgid 序列号
							if(!respMap.isEmpty()){
								logger.info("【smpp redis保存msgid_respid】key:{},respId:{},outId:{},seqId:{}",new Object[]{SMS_REDIS_MSGID_SMPP+submitResp.getMessageId(),respMap.get("respId"),respMap.get("outId"),respMap.get("seqId")});
								redisDao.setMapExpire(SMS_REDIS_MSGID_SMPP+submitResp.getMessageId(), respMap, Constants.SMS_EXPIRE_SECONDS);
								//避免状态报告先回,从redis取一次
								Map<String,String> reportMap=redisDao.getMap(SMS_REDIS_REPORT_SMPP+submitResp.getMessageId());
								//如果有推送report
								if(!reportMap.isEmpty()){
									SendMsgReport report=new SendMsgReport(1,SMS_CARRIER_TYPE_SMPP);
									report.setTel(reportMap.get("tel"));
									report.setStat(reportMap.get("stat"));
									report.setSubmitTime(reportMap.get("submitTime"));
									report.setDoneTime(reportMap.get("doneTime"));
									report.setCarrierMsgId(reportMap.get("carrierMsgId"));
									report.setMsgId(respMap.get("respId"));
									report.setSeqId(respMap.get("seqId"));
									report.setOutId(respMap.get("outId"));
									report.setCarDoneTime(reportMap.get("doneTime"));
									//异步推送
									String reportStr=JacksonUtil.writeValueAsString(report);
									logger.info("【smgpBJ 推送先到的状态报告】"+reportStr);
									pushInfo(reportStr);
									//report批量入库 放对列
//									SgwQueue.putReportQueue(report);
								}
							}else{
								logger.warn("【smgpBj redis保存msgid_respid】未取到序号"+SMS_REDIS_SEQ_SMPP+sequenceId+"对应的respId");
							}
							break;
						case SmppCommand.ENQUIRE_LINK:
							logger.info("【smgp短信网关收到SMGW发送的链路检测】CMD_SMPP_ACTIVE_TEST 序列号："+sequenceId);
							SmppMsgHead msgActiveResp=new SmppMsgHead();
							msgActiveResp.setCommandId(SmppCommand.ENQUIRE_LINK_RESP);
							msgActiveResp.setSequenceId(sequenceId);
							//进行回复
							outStream.write(msgActiveResp.toByteArry());
							outStream.flush();
							break;
						case SmppCommand.ENQUIRE_LINK_RESP:
							logger.info("【smgp短信网关收到SMGW的链路检测响应】CMD_SMPP_ACTIVE_TEST_RESP 序列号："+sequenceId);
							SmppMsgHead linkResp=new SmppMsgHead(totalLen, requestId, commandStatus,sequenceId,bodyBytes);
							map.put("resp", linkResp);
							/*SmppUnbind unbind = new SmppUnbind();
							unbind.setCommandId(SmppCommand.UNBIND);
							unbind.setSequenceId(getSequence());
							outStream.write(unbind.toByteArray());
							outStream.flush();*/
							break;
						case SmppCommand.DELIVER_SM:
							logger.debug("【smpp接收到SMGW DELIVER 消息】");
							SmppMsgDeliver msgDeliver=new SmppMsgDeliver(totalLen, requestId, commandStatus,sequenceId,bodyBytes);
							if(msgDeliver.getResult()==0){
//								logger.info("【smgp DELIVER 消息】序列号："+sequenceId+",是否状态报告回复"+(msgDeliver.getEsmClass()==4?"是,目的手机号："+msgDeliver.getsSrcTermID()+",msgId:"+msgDeliver.getsMsgID()+",reportId:"+msgDeliver.getMsg_Id_report()));
								if(msgDeliver.getEsmClass()==4){
									//状态报告 1：状态报告
									SendMsgReport report=new SendMsgReport(1,SMS_CARRIER_TYPE_SMPP);
									report.setTel(msgDeliver.getDestAddr());
									report.setStat(msgDeliver.getMsgState()+"");
//									report.setSubmitTime(msgDeliver.getSubmit_time());
//									report.setDoneTime(msgDeliver.getDone_time());
									report.setCarrierMsgId(msgDeliver.getMsgIdReport());
//									report.setCarDoneTime(msgDeliver.getDone_time());
									//需要根据状态报告的msgId取对应的返回网关的id
									String reportId=SMS_REDIS_MSGID_SMPP+msgDeliver.getMsgIdReport();
									Map<String, String> rpMap=redisDao.getMap(reportId);
									if(!rpMap.isEmpty()){
										report.setMsgId(rpMap.get("respId"));
										report.setSeqId(rpMap.get("seqId"));
										report.setOutId(rpMap.get("outId"));
										//异步推送
										String reportStr=JacksonUtil.writeValueAsString(report);
										logger.info("【smgpBJ 推送状态报告】"+reportStr);
										pushInfo(reportStr);
										//report批量入库 放对列
//										SgwQueue.putReportQueue(report);
									}else{
										logger.warn("【smgp redis状态报告】没取到reportId:{},tel:{},stat:{},submitTime:{},doneTime:{}",new Object[]{reportId,report.getTel(),report.getStat(),report.getSubmitTime(),report.getDoneTime()});
										//有可能submitResp未收到 入redis
//										Map<String,String> reportMap=new HashMap<String,String>();
//										reportMap.put("tel", msgDeliver.getDestAddr().trim());
//										reportMap.put("stat", msgDeliver.getStat().trim());
//										reportMap.put("submitTime", msgDeliver.getSubmit_time());
//										reportMap.put("doneTime", msgDeliver.getDone_time());
//										reportMap.put("carrierMsgId", msgDeliver.getMsg_Id_report());
//										redisDao.setMapExpire(SMS_REDIS_REPORT_SMPP+msgDeliver.getMsg_Id_report(), reportMap, Constants.SMS_EXPIRE_REPORT);
									}
								}else{
									//上行消息 0：非状态报告
									SendMsgMo mo=new SendMsgMo(0);
									mo.setCodePrefix(spCode);
									mo.setTel(msgDeliver.getSourceAddr());
									mo.setMoCode(msgDeliver.getDestAddr());
									mo.setContent(msgDeliver.getShortMessage());
									mo.setSubmitTime(DateUtils.formatMdhmsDate(new Date()));
									//异步推送
									String moStr=JacksonUtil.writeValueAsString(mo);
									logger.info("【smpp 推送上行消息】"+moStr);
									pushInfo(moStr);
								}
							}else{
								logger.warn("【smpp DELIVER 接收 解析失败】序列号："+sequenceId);
								break;
							}
							SmppMsgDeliverResp msgDeliverResp=new SmppMsgDeliverResp();
							msgDeliverResp.setCommandId(SmppCommand.DELIVER_SM_RESP);
							msgDeliverResp.setSequenceId(sequenceId);
							msgDeliverResp.setCommandStatus(0);
//							logger.debug("sequenceId:"+sequenceId+"msgDeliver.getsMsgID():"+msgDeliver.getsMsgID());
							//进行回复
							byte[] b=msgDeliverResp.toByteArry();
							logger.debug("b:"+b.length);
							outStream.write(b);
							outStream.flush();
							break;
						case SmppCommand.UNBIND_RESP:
							System.out.println("dsfjldsjf_UNBIND_RESP");
							break;
						default:
							logger.error("【smpp无法解析返回的包结构】包长度为:"+totalLen);
							break;
					}
					if(requestId==SmppCommand.SUBMIT_SM||requestId==SmppCommand.DELIVER_SM){
						logger.debug("【smgp 收到"+requestId+"继续】");
						continue;
					}
					logger.debug("【smgp get sequenceId】"+sequenceId);
					Object recObj=recMsgMap.get(sequenceId);
					if(recObj==null){
						logger.warn("【smgp recObj】sequenceId:{},requestId:{}可能已被注销!接收到消息丢弃",new Object[]{sequenceId,requestId});  
						continue;
					}
					String recObjSimpleName=recObj.getClass().getSimpleName();
					Condition msglock =null;
					logger.debug("【smgp】getSimpleName:"+recObjSimpleName);
					if(recObjSimpleName.equals("ConditionObject")){
						msglock =(Condition) recObj;
						recMsgMap.put(sequenceId, map);  
					}else{
						logger.warn("【smgp】sequenceId:"+sequenceId+"非ConditionObject接收到消息丢弃");  
						continue;
					}

	                try{  
	                	lock.lock();  
	                    msglock.signalAll();  //唤醒
	                    logger.debug("【smgp】msglock.signalAll()");
	                }finally {
	                    lock.unlock();  
	                    logger.debug("【smgp】lock.unlock()");
	                }
				}catch(IOException e){
					logger.error("【smgp读消息IO错误】"+e.getMessage());
					readFlag=false;
					isNetworkConnect=false;
					reConnection();
					e.printStackTrace();
				}catch(Exception e){
					logger.error("【smpp读消息错误】"+e.getMessage());
					e.printStackTrace();
				}
			}//while结束
		}
	}
	public static void main(String[] args) {
		SmppContainer.getInstance();
	}
	
	 public static final int EXTENDED_ESCAPE = 0x1b;

	    /** Page break (extended table). */
	    public static final int PAGE_BREAK = 0x0a;
	
	 public byte[] encodeString(String s) {
	        if (s == null) {
	            return new byte[0];
	        }

	        char[] c = s.toCharArray();
	        ByteArrayOutputStream enc = new ByteArrayOutputStream(256);

	        for (int loop = 0; loop < c.length; loop++) {
	            int search = 0;
	            for (; search < CHAR_TABLE.length; search++) {
	                if (search == EXTENDED_ESCAPE) {
	                    continue;
	                }

	                if (c[loop] == CHAR_TABLE[search]) {
	                    enc.write((byte) search);
	                    break;
	               }

	                if (c[loop] == EXT_CHAR_TABLE[search]) {
	                    enc.write((byte) EXTENDED_ESCAPE);
	                    enc.write((byte) search);
	                    break;
	               }
	            }
	            if (search == CHAR_TABLE.length) {
	                // A '?' character.
	                enc.write(0x3f);
	            }
	        }

	        return enc.toByteArray();
	    }
	 
	 private static final char[] CHAR_TABLE = {
	        '@', '\u00a3', '$', '\u00a5', '\u00e8', '\u00e9', '\u00f9', '\u00ec',
	        '\u00f2', '\u00c7', '\n', '\u00d8', '\u00f8', '\r', '\u00c5', '\u00e5',
	        '\u0394', '_', '\u03a6', '\u0393', '\u039b', '\u03a9', '\u03a0', '\u03a8',
	        '\u03a3', '\u0398', '\u039e', ' ', '\u00c6', '\u00e6', '\u00df', '\u00c9',
	        ' ', '!', '"', '#', '\u00a4', '%', '&', '\'',
	        '(', ')', '*', '+', ',', '-', '.', '/',
	        '0', '1', '2', '3', '4', '5', '6', '7',
	        '8', '9', ':', ';', '<', '=', '>', '?',
	        '\u00a1', 'A', 'B', 'C', 'D', 'E', 'F', 'G',
	        'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O',
	        'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W',
	        'X', 'Y', 'Z', '\u00c4', '\u00d6', '\u00d1', '\u00dc', '\u00a7',
	        '\u00bf', 'a', 'b', 'c', 'd', 'e', 'f', 'g',
	        'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o',
	        'p', 'q', 'r', 's', 't', 'u', 'v', 'w',
	        'x', 'y', 'z', '\u00e4', '\u00f6', '\u00f1', '\u00fc', '\u00e0',
	    };
	 
	 private static final char[] EXT_CHAR_TABLE = {
         0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, '^', 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0,
         '{', '}', 0, 0, 0, 0, 0, '\\',
         0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, '[', '~', ']', 0,
         '|', 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, '\u20ac', 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0,
 };
	 
	 
}

